1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package storm.starter.tools;
19  
20  import org.testng.annotations.DataProvider;
21  import org.testng.annotations.Test;
22  
23  import java.util.Map;
24  
25  import static org.fest.assertions.api.Assertions.assertThat;
26  
27  public class SlidingWindowCounterTest {
28  
29    private static final int ANY_WINDOW_LENGTH_IN_SLOTS = 2;
30    private static final Object ANY_OBJECT = "ANY_OBJECT";
31  
32    @DataProvider
33    public Object[][] illegalWindowLengths() {
34      return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 }, { 1 } };
35    }
36  
37    @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalWindowLengths")
38    public void lessThanTwoSlotsShouldThrowIAE(int windowLengthInSlots) {
39      new SlidingWindowCounter<Object>(windowLengthInSlots);
40    }
41  
42    @DataProvider
43    public Object[][] legalWindowLengths() {
44      return new Object[][]{ { 2 }, { 3 }, { 20 } };
45    }
46  
47    @Test(dataProvider = "legalWindowLengths")
48    public void twoOrMoreSlotsShouldBeValid(int windowLengthInSlots) {
49      new SlidingWindowCounter<Object>(windowLengthInSlots);
50    }
51  
52    @Test
53    public void newInstanceShouldHaveEmptyCounts() {
54      // given
55      SlidingWindowCounter<Object> counter = new SlidingWindowCounter<Object>(ANY_WINDOW_LENGTH_IN_SLOTS);
56  
57      // when
58      Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();
59  
60      // then
61      assertThat(counts).isEmpty();
62    }
63  
64    @DataProvider
65    public Object[][] simulatedCounterIterations() {
66      return new Object[][]{ { 2, new int[]{ 3, 2, 0, 0, 1, 0, 0, 0 }, new long[]{ 3, 5, 2, 0, 1, 1, 0, 0 } },
67          { 3, new int[]{ 3, 2, 0, 0, 1, 0, 0, 0 }, new long[]{ 3, 5, 5, 2, 1, 1, 1, 0 } },
68          { 4, new int[]{ 3, 2, 0, 0, 1, 0, 0, 0 }, new long[]{ 3, 5, 5, 5, 3, 1, 1, 1 } },
69          { 5, new int[]{ 3, 2, 0, 0, 1, 0, 0, 0 }, new long[]{ 3, 5, 5, 5, 6, 3, 1, 1 } },
70          { 5, new int[]{ 3, 11, 5, 13, 7, 17, 0, 3, 50, 600, 7000 },
71              new long[]{ 3, 14, 19, 32, 39, 53, 42, 40, 77, 670, 7653 } }, };
72    }
73  
74    @Test(dataProvider = "simulatedCounterIterations")
75    public void testCounterWithSimulatedRuns(int windowLengthInSlots, int[] incrementsPerIteration,
76        long[] expCountsPerIteration) {
77      // given
78      SlidingWindowCounter<Object> counter = new SlidingWindowCounter<Object>(windowLengthInSlots);
79      int numIterations = incrementsPerIteration.length;
80  
81      for (int i = 0; i < numIterations; i++) {
82        int numIncrements = incrementsPerIteration[i];
83        long expCounts = expCountsPerIteration[i];
84        // Objects are absent if they were zero both this iteration
85        // and the last -- if only this one, we need to report zero.
86        boolean expAbsent = ((expCounts == 0) && ((i == 0) || (expCountsPerIteration[i - 1] == 0)));
87  
88        // given (for this iteration)
89        for (int j = 0; j < numIncrements; j++) {
90          counter.incrementCount(ANY_OBJECT);
91        }
92  
93        // when (for this iteration)
94        Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();
95  
96        // then (for this iteration)
97        if (expAbsent) {
98          assertThat(counts).doesNotContainKey(ANY_OBJECT);
99        }
100       else {
101         assertThat(counts.get(ANY_OBJECT)).isEqualTo(expCounts);
102       }
103     }
104   }
105 
106 }